package com.jplus.plugins.rpc.impl.handler.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jplus.plugins.rpc.Constant;
import com.jplus.plugins.rpc.bean.RpcRequest;
import com.jplus.plugins.rpc.bean.RpcResponse;
import com.jplus.plugins.rpc.bean.Service;
import com.jplus.plugins.rpc.bean.ZNode;
import com.jplus.plugins.rpc.impl.handler.netty.coder.NettyDecoder;
import com.jplus.plugins.rpc.impl.handler.netty.coder.NettyEncoder;
import com.jplus.plugins.rpc.impl.iface.IAgreement;
import com.jplus.plugins.rpc.impl.iface.IWatcher;

/**
 * NettyService
 * @author Yuanqy
 *
 */
public class NettyService implements IAgreement {
	private final Logger logger = LoggerFactory.getLogger(NettyService.class);

	@Override
	public void opServer(final IWatcher watcher, final Map<String, Service> handler) throws Exception {
		final String host = Constant.RPC_LOCAL_HOST;
		final int port = Constant.NETTY_PORT;

		new Thread("NettyServer-Thread") {
			@Override
			public void run() {// 开启新线程，阻塞转异步，异步转同步，从而跳过阻塞.
				try {// =======================================================
					EventLoopGroup bossGroup = new NioEventLoopGroup();
					EventLoopGroup workerGroup = new NioEventLoopGroup();
					try {
						ServerBootstrap bootstrap = new ServerBootstrap();
						bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
							@Override
							public void initChannel(SocketChannel channel) throws Exception {
								channel.pipeline().addLast(new NettyDecoder()) // 将RPC请求进行解码（为了处理请求）
										.addLast(new NettyEncoder()) // 将RPC响应进行编码（为了返回响应）
										.addLast(new NettyHandler(handler)); // 处理请求
							}
						}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
						ChannelFuture future = bootstrap.bind(host, port).sync();
						logger.info("RPC [netty] server started at addr: {}:{}", host, port);
						// ==回调通知,跳过阻塞
						watcher.process(true);
						future.channel().closeFuture().sync();// 这里会阻塞线程
					} finally {
						workerGroup.shutdownGracefully();
						bossGroup.shutdownGracefully();
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}.start();
	}

	// ======================================================================================
	private RpcResponse response = null;

	@Override
	public RpcResponse opClient(RpcRequest request, ZNode serNode) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		final CountDownLatch latch = new CountDownLatch(1);
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel channel) throws Exception {
					channel.pipeline().addLast(new NettyEncoder()) // 将请求进行编码（为了发送请求）
							.addLast(new NettyDecoder()) // 将响应进行解码（为了处理响应）
							.addLast(new SimpleChannelInboundHandler<RpcResponse>() {
								@Override
								protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
									response = msg;
									latch.countDown();
								}

								@Override
								public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
									logger.error("client caught exception", cause);
									ctx.close();
								}
							}); // 使用 RpcClient 发送请求
				}
			}).option(ChannelOption.SO_KEEPALIVE, true);

			ChannelFuture future = bootstrap.connect(serNode.getHost(), serNode.getPort()).sync();
			future.channel().writeAndFlush(request).sync();
			latch.await();// 线程等待
			if (response != null) {
				future.channel().closeFuture().sync();
			}
			return response;
		} finally {
			group.shutdownGracefully();
		}
	}

}
